package com.fwmagic.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class Hbase1xDemo {

    //于Hbase数据库的连接对象
    Connection connection;

    Configuration conf;

    // 数据库元数据操作对象
    Admin admin;

    @Before
    public void init() throws Exception {

        //取得一个数据库连接的配置参数对象
        conf = HBaseConfiguration.create();
        //设置zk连接地址,对于habse客户端来说，只需要知道habse所使用的zk集群即可
        //因为habse的客户端找hbase的读写数据完全不用经过hmaster
        conf.set("hbase.zookeeper.quorum", "hd1:2181,hd2:2181,hd3:2181");
//        conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");
//        conf.set("hbase.zookeeper.quorum", "192.168.62.131:2181,192.168.62.132:2181,192.168.62.133:2181");

        //取得一个数据库连接对象
        connection = ConnectionFactory.createConnection(conf);

        //取得一个数据库元数据操作对象
        admin = connection.getAdmin();
    }

    /**
     * 关闭连接
     *
     * @throws Exception
     */
    @After
    public void close() throws Exception {
        admin.close();
        connection.close();
    }


    /**
     * 创建表
     *
     * @throws Exception
     */
    @Test
    public void createTable() throws Exception {
        System.out.println("------创建表 START-----");

        //数据表表名
        String tableNameString = "t_user_info2";

        //新建一个数据表表名对象
        TableName tableName = TableName.valueOf(tableNameString);

        //数据表描述对象
        HTableDescriptor htd = new HTableDescriptor(tableName);

        //列族描述对象,指定列名
        HColumnDescriptor hcd1 = new HColumnDescriptor("base_info");

        //列族描述对象，指定列名
        HColumnDescriptor hcd2 = new HColumnDescriptor("extra_info");

        //为该列族设置一个布隆过滤器类型参数/版本数量
        hcd2.setBloomFilterType(BloomType.ROW).setVersions(1, 3);

        //将列族描述器添加到表描述器中
        htd.addFamily(hcd1).addFamily(hcd2);

        //新建数据表
        admin.createTable(htd);

        System.out.println("----创建表END-----");
    }

    /**
     * 删除表
     *
     * @throws Exception
     */
    @Test
    public void dropTable() throws Exception {
        admin.disableTable(TableName.valueOf("t_user_info2"));
        admin.deleteTable(TableName.valueOf("t_user_info2"));
    }

    /**
     * 修改表定义
     *
     * @throws Exception
     */
    @Test
    public void testModify() throws Exception {
        //修改已有表的ColumnFamily
        HTableDescriptor table = admin.getTableDescriptor(TableName.valueOf("t_user_info2"));
        HColumnDescriptor f2 = table.getFamily("extra_info".getBytes());
        f2.setBloomFilterType(BloomType.ROWCOL);
        //添加新的ColumnFamily
        table.addFamily(new HColumnDescriptor("other_info"));
        admin.modifyTable(TableName.valueOf("t_user_info2"), table);
    }

    /**
     * 插入数据
     *
     * @throws Exception
     */
    @Test
    public void testPutSingle() throws Exception {
        Table table = connection.getTable(TableName.valueOf("orderb"));
        //设置RowKey
        String oid = "oid";
        //赋值
        Put put = new Put(Bytes.toBytes(oid + "1"));
        put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("order_id"), Bytes.toBytes(oid + "1"));
        put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("total_money"), Bytes.toBytes(1000));

        put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("topic"), Bytes.toBytes("wc"));
        put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("partition"), Bytes.toBytes(0));
        put.addColumn(Bytes.toBytes("offset"), Bytes.toBytes("offset"), Bytes.toBytes(5));

        table.put(put);
        table.close();
    }

    /**
     * 插入数据
     *
     * @throws Exception
     */
    @Test
    public void testPut() throws Exception {
        Table table = connection.getTable(TableName.valueOf("t_user_info2"));

        ArrayList<Put> puts = new ArrayList<Put>();

        //构建一个put对象（kv），执行其行健
        Put put01 = new Put(Bytes.toBytes("user001"));
        put01.addColumn("base_info".getBytes(), "username".getBytes(), "zhangsan".getBytes());

        Put put02 = new Put(Bytes.toBytes("user001"));
        put02.addColumn("base_info".getBytes(), "password".getBytes(), "123456".getBytes());

        Put put03 = new Put(Bytes.toBytes("user002"));
        put03.addColumn("base_info".getBytes(), "username".getBytes(), "lisi".getBytes());
        put03.addColumn("extra_info".getBytes(), "married".getBytes(), "false".getBytes());

        Put put04 = new Put(Bytes.toBytes("zhang_sh_01"));
        put04.addColumn("base_info".getBytes(), "username".getBytes(), "zhang01".getBytes());
        put04.addColumn("extra_info".getBytes(), "married".getBytes(), "false".getBytes());

        Put put05 = new Put(Bytes.toBytes("zhang_sh_02"));
        put05.addColumn("base_info".getBytes(), "username".getBytes(), "zhang02".getBytes());
        put05.addColumn("extra_info".getBytes(), "married".getBytes(), "false".getBytes());


        Put put06 = new Put(Bytes.toBytes("liu_sh_01"));
        put06.addColumn("base_info".getBytes(), "username".getBytes(), "liu01".getBytes());
        put06.addColumn("extra_info".getBytes(), "married".getBytes(), "false".getBytes());


        Put put07 = new Put(Bytes.toBytes("zhang_bj_01"));
        put07.addColumn("base_info".getBytes(), "username".getBytes(), "zhang03".getBytes());
        put07.addColumn("extra_info".getBytes(), "married".getBytes(), "false".getBytes());

        Put put08 = new Put(Bytes.toBytes("zhang_bj_01"));
        put08.addColumn("base_info".getBytes(), "username".getBytes(), "zhang04".getBytes());
        put08.addColumn("extra_info".getBytes(), "married".getBytes(), "false".getBytes());

        puts.add(put01);
        puts.add(put02);
        puts.add(put03);
        puts.add(put04);
        puts.add(put05);
        puts.add(put06);
        puts.add(put07);
        puts.add(put08);

        table.put(puts);
        table.close();
    }

    //读取数据：一次读一行
    @Test
    public void testGet() throws Exception {
        Table table = connection.getTable(TableName.valueOf("t_user_info2"));
        //构建一个Get的查询对象，指定要get的是哪一行
        Get get = new Get("user001".getBytes());
        Result result = table.get(get);
        CellScanner cellScanner = result.cellScanner();
        while (cellScanner.advance()) {
            Cell current = cellScanner.current();
            byte[] familyArray = current.getFamilyArray();
            byte[] qualifierArray = current.getQualifierArray();
            byte[] valueArray = current.getValueArray();

            System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
            System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
            System.out.println("  " + new String(valueArray, current.getValueOffset(), current.getValueLength()));
        }
    }


    //读取数据：一次读多行
    @Test
    public void testGetList() throws Exception {
        Table table = connection.getTable(TableName.valueOf("PHONE_LABLE_BASE"));
        long start = System.currentTimeMillis();
        //构建一个Get的查询对象，指定要get的是哪一行
        List<String> list = Arrays.asList(
                "0006f702659d76ed4824312625227cfc",
                "00062797bf8c4058fd003efe105f0989",
                "0005d44e9f4d53692d7cec384d51ab6e");
        List<Get> gets = new ArrayList<>();
        for (String phoneKey : list) {
            gets.add(new Get(phoneKey.getBytes()));
        }
        Result[] results = table.get(gets);
        for (Result result : results) {
            Cell[] cells = result.rawCells();
            if (cells.length == 0) {
                System.out.println("未查询到数据!");
            } else {
                for (Cell cell : cells) {
                    //列族
                    String family = Bytes.toString(CellUtil.cloneFamily(cell));
                    //列名
                    String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                    //值
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    System.out.println(family + ":" + qualifier + ":" + value);
                }
            }
            System.out.println("===========================");
        }
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end - start) + "ms");

    }


    /**
     * 删除表中列的数据
     *
     * @throws Exception
     */
    @Test
    public void testDel() throws Exception {
        Table table = connection.getTable(TableName.valueOf("t_user_info2"));
        Delete delete = new Delete("user001".getBytes());
        delete.addColumn("base_info".getBytes(), "address".getBytes());
        table.delete(delete);
        table.close();

    }

    /**
     * 批量查询表数据
     */
    @Test
    public void testScan() throws Exception {
        Table table = connection.getTable(TableName.valueOf("t_user_info2"));
//        Scan scan = new Scan();
        Scan scan = new Scan("liu_sh_01".getBytes(), "zhang_bj_01_000".getBytes());

        ResultScanner scanner = table.getScanner(scan);
        Iterator<Result> iterator = scanner.iterator();
        while (iterator.hasNext()) {
            Result result = iterator.next();
            CellScanner cellScanner = result.cellScanner();
            while (cellScanner.advance()) {
                Cell current = cellScanner.current();
                byte[] familyArray = current.getFamilyArray();
                byte[] qualifierArray = current.getQualifierArray();
                byte[] valueArray = current.getValueArray();
                byte[] rowArray = current.getRowArray();

                System.out.print(new String(rowArray, current.getRowLength(), current.getRowOffset()));
                System.out.print("\t" + new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
                System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
                System.out.println("\t" + new String(valueArray, current.getValueOffset(), current.getValueLength()));
            }
            System.out.println("--------");
        }
        table.close();
    }

    public void testScan(Filter filter) throws Exception {
        Table table = connection.getTable(TableName.valueOf("t_user_info2"));
        Scan scan = new Scan();
        scan.setFilter(filter);

        ResultScanner scanner = table.getScanner(scan);
        Iterator<Result> iterator = scanner.iterator();
        while (iterator.hasNext()) {
            Result result = iterator.next();
            CellScanner cellScanner = result.cellScanner();
            while (cellScanner.advance()) {
                Cell current = cellScanner.current();
                byte[] familyArray = current.getFamilyArray();
                byte[] qualifierArray = current.getQualifierArray();
                byte[] valueArray = current.getValueArray();
                byte[] rowArray = current.getRowArray();

                System.out.print(new String(rowArray, current.getRowLength(), current.getRowOffset()));
                System.out.print("\t" + new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
                System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
                System.out.println("\t" + new String(valueArray, current.getValueOffset(), current.getValueLength()));
            }
            System.out.println("--------");
        }
        table.close();
    }

    @Test
    public void testFilter() throws Exception {
        //针对行键的前缀过滤
//        PrefixFilter filter = new PrefixFilter("liu".getBytes());
//        testScan(filter);

        //行过滤器
//        RowFilter filter = new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator("user002".getBytes()));
//        testScan(filter);

//        RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("01"));
//        testScan(filter);

//        SingleColumnValueFilter filter = new SingleColumnValueFilter("base_info".getBytes(), "password".getBytes(), CompareFilter.CompareOp.EQUAL, "123456".getBytes());
//        //如果指定的列缺失，则也过滤掉
//        filter.setFilterIfMissing(true);
//        testScan(filter);


        //针对指定一个列的value的比较器来过滤
//        RegexStringComparator comparator = new RegexStringComparator("^zhang");
//        SubstringComparator comparator = new SubstringComparator("lisi");
//        SingleColumnValueFilter filter = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareFilter.CompareOp.EQUAL, comparator);
//        testScan(filter);

        //针对列族名的过滤，返回结果中只会满足条件的列族中的数据
//        FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("info".getBytes()));
//        FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator("base".getBytes()));
//        testScan(filter);

        //针对列名的过滤器，返回结果中只会包含满足条件的列的数据
//        QualifierFilter filter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("username".getBytes()));
//        QualifierFilter filter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator("li".getBytes()));
//        testScan(filter);

//        ColumnPrefixFilter filter = new ColumnPrefixFilter("pass".getBytes());
//        testScan(filter);

//        byte[][] prefixes = new byte[][]{"username".getBytes(), "password".getBytes()};
//        MultipleColumnPrefixFilter filter = new MultipleColumnPrefixFilter(prefixes);
//        testScan(filter);

        FamilyFilter filter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator("base".getBytes()));
        ColumnPrefixFilter filter1 = new ColumnPrefixFilter("password".getBytes());
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        filterList.addFilter(filter);
        filterList.addFilter(filter1);
        testScan(filterList);


    }

    /**
     * 分页查询
     *
     * @throws Exception
     */
    @Test
    public void pageScan() throws Exception {
        final byte[] POSTFIX = new byte[]{0x00};
        Table table = connection.getTable(TableName.valueOf("t_user_info2"));
        PageFilter pageFilter = new PageFilter(3);//每页3条
        byte[] lastRow = null;
        int totalRows = 0;
        while (true) {
            Scan scan = new Scan();
            scan.setFilter(pageFilter);
            if (lastRow != null) {
                byte[] startRow = Bytes.add(lastRow, POSTFIX);
                scan.setStartRow(startRow);
            }
            ResultScanner scanner = table.getScanner(scan);
            int localRows = 0;
            Result result;
            while ((result = scanner.next()) != null) {
                System.out.println(++localRows + ": " + result);
                totalRows++;
                lastRow = result.getRow();
            }
            scanner.close();
            if (localRows == 0) break;
            Thread.sleep(2000);
        }
        System.out.println("totla rows:" + totalRows);
    }

}
